-
Notifications
You must be signed in to change notification settings - Fork 10
feat: configure extra message route to publish validationErrors to Kafka topic #61
Conversation
type ValidationErrorNotifier func(validationError *ValidationError) error | ||
|
||
// ValidationErrorToChanNotifier notifies to a given chan when a ValidationError happens. | ||
func ValidationErrorToChanNotifier(errChan chan *ValidationError) ValidationErrorNotifier { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of this has been removed in favor of using publisher through watermill.io routes.
@@ -60,6 +56,14 @@ func NewProxy(c *ProxyConfig) (proxy.Proxy, error) { | |||
_ = server.Server.Flags().Set("log-level", "debug") | |||
} | |||
|
|||
if c.TLS != nil && c.TLS.Enable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was previously configured through the c ExtraConfig
but now they have been moved into the config directly as it is important to be explicit. In that way, the TLS config can be reused into other parts.
|
||
opts := []kafka.ProxyOption{kafka.WithMessageHandler(handler.ValidateMessage(validator, false))} | ||
|
||
if c.MessageValidation.PublishToKafkaTopic == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the topic is set (from env var atm), then all the re-routing to publish the messages back into Kafka (but another topic) is configured.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! :) Only one question, one suggestion :)
Co-authored-by: Maciej Urbańczyk <urbanczyk.maciej.95@gmail.com>
Description
Fixes #37
This PR adds support for configuring an extra publisher for incoming Kafka messages so they can be published again to another topic after validation (Same kafka cluster, different topic).
Now, validated messages that got a validation error can be re-enqueued to another kafka topic so they can be consumed by someone else and run forensics on their own. In this case, we will be exposing those by default to the websocket port by consuming directly from the second configured topic.
The
asyncapi.yaml
has also changed since the validation errors are now injected to the Metadata (headers) of the Message model, so they can be reused, rerouted, etc.Related issue(s)
#37